feat: Prunable shard specs for streaming published segments#19571
Conversation
…gestion
Adds a new core ShardSpec (stream_range) that lets Kafka streaming tasks
declare, per published segment, the distinct values observed for configured
partitionFilterDimensions. The broker uses these to prune segments whose
declared values cannot match a query filter — enabling near-realtime pruning
without waiting for compaction.
Highlights:
- StreamRangeShardSpec extends NumberedShardSpec; possibleInDomain prunes by
per-value range intersection. Null is declared as a first-class value
(encoded as Range.lessThan("")) so IS NULL queries are never wrongly pruned,
and is kept distinct from the empty string.
- Opt-in via partitionFilterDimensions on the Kafka supervisor/IOConfig
(null by default; segments otherwise get a plain NumberedShardSpec). Kafka
only for now; backward-compatible config (old specs/constructors unchanged).
- Per-segment value accumulation at ingest time; each segment is stamped with
only its own observed values at publish.
- Correctness guards: restart-spanning segments fall back to NumberedShardSpec
(pre-restart rows are not re-read, so their values can't be fully observed);
dimensions that observed a null/missing value declare null so IS NULL is not
pruned.
- BaseAppenderatorDriver reconciles the returned SegmentsAndCommitMetadata to
the published shard specs so handoff/publish logs report the real spec.
Tests:
- StreamRangeShardSpecTest: possibleInDomain matrix incl. null vs "" and serde.
- SeekableStreamIndexTaskRunnerTest: annotator unit tests (restart fallback,
null handling).
- EmbeddedStreamRangeShardSpecTest: end-to-end pruning verified via the
query/segment/time scan metric across a predicate matrix (=, !=, IN, NOT IN,
IS NULL, IS NOT NULL, multi-value, untracked dimension, non-existent value),
plus a no-partitioning control twin and in-memory/graceful-widening cases.
- StreamAppenderatorDriverTest: returned metadata carries the published spec.
|
|
||
| // annotateSegmentWithPartitionFilters is a no-op (returns the segment unchanged) when partition filters are not | ||
| // configured, so it is always safe to apply here. | ||
| final java.util.function.Function<Set<DataSegment>, Set<DataSegment>> shardSpecAnnotator = |
There was a problem hiding this comment.
nit: can add Function to imports
|
|
||
| for (DataSegment segment : publishedSegmentsAndCommitMetadata.getSegments()) { | ||
| observedDimensionValuesBySegment.remove( | ||
| SegmentIdWithShardSpec.fromDataSegment(segment).toString() |
There was a problem hiding this comment.
Should we also clean up restartSpannedSegments.remove here?
| return s; | ||
| } | ||
| final Map<String, List<String>> snapshotFilters = new HashMap<>(); | ||
| for (String dim : filterDims) { |
There was a problem hiding this comment.
for (String dim : filterDims) {
segObserved.computeIfPresent(dim, (k, vals) -> {
synchronized (vals) {
if (!vals.isEmpty()) {
snapshotFilters.put(dim, new ArrayList<>(vals));
}
}
return vals; // Return unchanged - we're just reading
});
}
There was a problem hiding this comment.
Claude recommendation for race condition
| |`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds.|No|100| | ||
| |`useEarliestOffset`|Boolean|If a supervisor is managing a datasource for the first time, it obtains a set of starting offsets from Kafka. This flag determines whether the supervisor retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks start from where the previous segments ended so this flag is only used on the first run.|No|`false`| | ||
| |`idleConfig`|Object|Defines how and when the Kafka supervisor can become idle. See [Idle configuration](#idle-configuration) for more details.|No|null| | ||
| |`partitionFilterDimensions`|List of String|Dimensions to track for query-time segment pruning. See [Partition filter dimensions](#partition-filter-dimensions) for details.|No|null| |
There was a problem hiding this comment.
What do you think about naming this partitionDimensions to align with the compaction config? That may make it more clear that those values should be in sync
There was a problem hiding this comment.
Yes, good call - thanks for the suggestion! I also moved this into the tuningConfig for consistency. It's nested under a streamingPartitionsSpec container to avoid naming ambiguity and so we can extend it with additional properties and/or add new types in the future if needed.
| - Use only low-to-medium cardinality dimensions (for example, `tenant_id`, `region`, `environment`). High-cardinality dimensions bloat segment metadata with no pruning benefit. | ||
| - Most effective when Kafka partitions are keyed by the tracked dimension (for example, using tenant ID as the message key). Each task naturally sees a subset of values, and segments get tight filter annotations. | ||
| - Also works with multiple supervisors reading from separate topics into one datasource. | ||
| - After compaction, the `StreamRangeShardSpec` annotations are replaced by the compaction output's shard spec (hash or range partitioning), which provides its own pruning. |
There was a problem hiding this comment.
Maybe worth mentioning that when using partitionFilterDimensions, dynamic compaction strategy should not be used
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 0 |
| P3 | 1 |
| Total | 2 |
Reviewed 16 of 16 changed files.
I found two issues: restart-spanning segments can mix shard spec classes within one publish interval and fail publishing, and Kafka backfill specs drop partitionFilterDimensions.
This is an automated review by Codex GPT-5.5
| return s; | ||
| } | ||
| final String lookupKey = SegmentIdWithShardSpec.fromDataSegment(s).toString(); | ||
| if (restartSpannedSegments.contains(lookupKey)) { |
There was a problem hiding this comment.
P1 Mixed shard specs can fail publish after restart
Restart-spanned segments return unchanged as NumberedShardSpec, while new same-interval segments in the same publish batch can be annotated as StreamRangeShardSpec. TransactionalSegmentPublisher then runs SegmentPublisherHelper.annotateShardSpec, which rejects mixed shard-spec classes per interval, so a restarted task can fail publish/handoff. Make the fallback interval-wide, or stamp restored segments with a non-pruning stream_range spec.
There was a problem hiding this comment.
Thanks, this is fixed with some test coverage added.
| emitTimeLagMetrics, | ||
| serverPriorityToReplicas, | ||
| boundedStreamConfig, | ||
| null |
There was a problem hiding this comment.
P3 Backfill specs drop partitionFilterDimensions
This compatibility constructor always forwards null for partitionFilterDimensions. KafkaSupervisorSpec.createBackfillSpec still uses this overload when deriving bounded backfill specs, so a supervisor configured with partitionFilterDimensions silently creates backfill tasks without the pruning annotations. Pass the existing dimension list through for backfill specs.
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 0 |
| P2 | 0 |
| P3 | 1 |
| Total | 1 |
Reviewed 19 of 19 changed files.
This is an automated review by Codex GPT-5.5
| if (vals.isEmpty()) { | ||
| continue; | ||
| } | ||
| snapshot = new ArrayList<>(vals); |
There was a problem hiding this comment.
[P3] Sort observed partition values before publishing
The list stored in partitionDimensionValues is created directly from a HashSet, so its order is unspecified. The embedded test already asserts a concrete order for these values, and equivalent published segment metadata can vary by JVM or run even when the value set is identical. Sort the snapshot deterministically, with explicit null handling, before putting it into the shard spec.
Reviewed 19 of 19 changed files.
| }, | ||
| "tuningConfig": { | ||
| "type": "kafka", | ||
| "streamingPartitionsSpec": {"partitionDimensions": ["tenant"]} |
There was a problem hiding this comment.
Nice! Makes sense to have this in tuningConfig, and in the future we can add cardinality guardrails into streamingPartitionsSpec as well
| ); | ||
| Assert.assertEquals(List.of("tenant", "region"), config.getStreamingPartitionsSpec().getPartitionDimensions()); | ||
| } | ||
|
|
There was a problem hiding this comment.
Might be worth having a test case which covers edge case configuration for partitionDimensions (e.g. null values, empty string, integers)
There was a problem hiding this comment.
Might be worth having a test case which covers edge case configuration for partitionDimensions (e.g. null values, empty string
I've added some tests in this unit test and there's some existing end-to-end coverage in the embedded ones too for null, empty cases
Re numeric types, they're currently unsupported for the range shard spec as well, given how the filters work - some details here: #19415. For now it's just documented for this new shard spec, but it would be nice to expand that functionality generally so all shard specs benefit from it; if that's not trivial, we could block them at creation time at least for this new shard spec in the future
| snapshot.sort(Comparator.nullsFirst(Comparator.naturalOrder())); | ||
| snapshotFilters.put(dim, snapshot); | ||
| } | ||
| } |
There was a problem hiding this comment.
For a follow up, it may be useful to emit metric here for the cardinality of observedPartitionDimValuesBySegment.get(segmentId)
| final TransactionalSegmentPublisher publisher, | ||
| final Committer committer, | ||
| final Collection<String> sequenceNames, | ||
| final java.util.function.Function<Set<DataSegment>, Set<DataSegment>> segmentAnnotateFunction |
There was a problem hiding this comment.
java.util.function.Function can also be imported since it's used twice
There was a problem hiding this comment.
Yeah, I tried using Guava's Function here (which is used in the class below), but it doesn't seem to fit, hence the inline java.util.function.Function
aho135
left a comment
There was a problem hiding this comment.
Left a few minor comments, but otherwise LGTM. Thanks @abhishekrb19!
FrankChen021
left a comment
There was a problem hiding this comment.
Reviewed 5 of 20 changed files (6 files total inspected) for this reply-needed follow-up.
Confirmed the two existing review follow-ups are addressed in current code: observed partition values are now sorted deterministically with explicit null handling, and restart-spanned segments now use empty-filter StreamRangeShardSpec behavior with relevant task-runner and publisher-helper test coverage.
No inline reply looks necessary.
This is an automated review by Codex GPT-5.5
|
Thanks for the reviews @aho135 @FrankChen021! I also went ahead and rename the shard spec type from |
Relates to #12929
Streaming-published segments currently have numbered shard specs, which aren't prunable by design. Compaction must reindex the data with range or hashed partition strategy once the data is handed off — even if the topic is partitioned, which
is easy to do with multiple supervisors. For multi-tenant datasources this means every tenant-filtered query hits every recent segment regardless of the partitioning strategy for numbered shard specs.
This PR lets streaming tasks record, per published segment, the distinct values observed for a configured set of dimensions, and declare them on a new shard spec so the broker can prune near-realtime data without waiting for compaction to
reindex handed off segments. Concurrent compaction cannot always keep up with the incoming data and additionally the compaction process itself takes time to reindex; so the benefits of range or hashed shard specs may not be fully realized for
however long it takes to reindex (30-45 minutes in our case), and doesn't help with high concurrent query workloads that are only querying more recent data.
So this PR adds a way to publish prunable shards right off the bat when they're handed off by streaming tasks, if configured. This functionality is opt-in, Kafka-only, and disabled by default.
Design
DimensionValueSetShardSpec(type:"dim_value_set") — extendsNumberedShardSpec(behaves as a normal append segment) plus a partitionDimensionValues map (dimension → observed values). possibleInDomain() prunes a segment when the queryconstrains a declared dimension and none of its values intersect the domain; a dimension not in partitionDimensionValues is never pruned on. Set-based (not min/max), so it prunes precisely for sparse values and tolerates overlapping value
sets across tasks/restarts.
shard specs (this caveat can be addressed with a guard rail noted below).
partitionDimensionsis set, the task accumulates observed values per segment and stamps each at publish.Configuration
New optional
tuningConfigfieldstreamingPartitionsSpecon the Kafka supervisor/task (default null), with apartitionDimensionslist. When unset, behavior is unchanged. Documented in docs/ingestion/kafka-ingestion.md.Compatibility
Backward-compatible and opt-in. But dim_value_set is a new core ShardSpec type with no defaultImpl fallback, so it is not forward-compatible: upgrade all services before enabling streamingPartitionsSpec, and note that once dim_value_set
segments are published, downgrade isn't supported until they're compacted away.
Results
Tested in a cluster, where I saw up to ~40% reduction in segment scans on the historicals for a few low to medium cardinal partition dimensions. In a follow-up, I want to extend this to also prune tasks, for reduced peon buffers and better
query performance at the task layer.
Caveats
There's currently no limit on the number of observed values stamped into a segment's partitionDimensionValues. It may make sense, in a follow-up, to add a configurable guardrail that falls back to NumberedShardSpec when the count exceeds a
threshold, so shard specs don't get bloated.
Release note
Kafka ingestion can now publish segments that the broker prunes at query time, without waiting for compaction. Set tuningConfig.streamingPartitionsSpec.partitionDimensions to a list of low-to-medium cardinality dimensions; each task records
the distinct values it observes per dimension and stamps them onto a new dim_value_set shard spec. Queries that filter on a declared dimension then skip segments whose values can't match. The feature is opt-in, Kafka-only, and disabled by
default; when unset, behavior is unchanged.
Compatibility:
dim_value_setis a new core shard spec type with no fallback, so it is not forward-compatible. Upgrade all services before enabling streamingPartitionsSpec. Once dim_value_set segments are published, downgrade is unsupporteduntil they are compacted away or streamingPartitionsSpec is removed.
This PR has: